-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Live Stream processing support #23
Conversation
1f19410
to
1e9dbfb
Compare
- added license headers to java files - added bidirectional gRPC live streaming blocks implementation with 1 producer and N consumers Signed-off-by: Matt Peterson <[email protected]>
1e9dbfb
to
0608e65
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start, happy to see the PR! Another thing I'm missing is some kind of design documentation to give some orientation around what the different components are.
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions and a question or two.
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/BlockStreamService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/cache/BNLinkedHashMap.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
…k. added tests" Signed-off-by: Matt Peterson <[email protected]>
server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Matt Peterson <[email protected]>
server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java
Outdated
Show resolved
Hide resolved
…re line to prevent that from being added Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
…onsumers Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
…of the proto file Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
Hey @rbair23, given the expected refactoring of this approach, do you mind if I submit the design doc with the PR for that effort? |
server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts
Outdated
Show resolved
Hide resolved
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only a few nits...
buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts
Outdated
Show resolved
Hide resolved
Signed-off-by: Matt Peterson <[email protected]>
Signed-off-by: Matt Peterson <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work.
Some considerations for future PRs and backlog tickets
|
||
static void grpcEcho(EchoServiceGrpcProto.EchoRequest request, StreamObserver<EchoServiceGrpcProto.EchoResponse> responseObserver) {} | ||
// TODO: Make timeoutThresholdMillis configurable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please turn this into a ticket on the backlog. Thanks
/** | ||
* Constants used in the BlockNode service. | ||
*/ | ||
public final class Constants { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Would ServiceContracts be more appropriate.
I feel like this is one of those files that will get overloaded with many values over time
|
||
static void grpcEcho(EchoServiceGrpcProto.EchoRequest request, StreamObserver<EchoServiceGrpcProto.EchoResponse> responseObserver) {} | ||
// TODO: Make timeoutThresholdMillis configurable | ||
final BlockStreamService blockStreamService = new BlockStreamService(1500, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1500 should be a configureable value
|
||
// Start the web server | ||
WebServer.builder() | ||
.port(8080) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose as a configureable value
this.mediator = mediator; | ||
this.responseStreamObserver = responseStreamObserver; | ||
|
||
this.producerLivenessMillis = producerLivenessClock.millis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: This is the part you noted helidon doesn't handle itself huh so we need to implement our own custom solution to determine timeouts. Sigh.
We should open an issue on helidon for this.
if we're luck it might get resolved and we can remove this logic in the future
*/ | ||
@Override | ||
public void unsubscribe(final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver) { | ||
if (subscribers.remove(liveStreamObserver)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the removal fails is there some backup step we should take?
Does this hold up a slot that is unclaimable?
Should we at least warn or something?
} | ||
|
||
// Persist the block | ||
blockPersistenceHandler.persist(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Should we consider splitting out persistence into it's own StreamMediator or something appropriate?
I don't like the idea that the block doesn't get persisted if there are any issues streaming to subscribers.
Even if we don't pull it out we should persist first.
private String resolvePath(final Long id) { | ||
|
||
String fileName = id + BLOCK_FILE_EXTENSION; | ||
Path fullPath = blockNodeRootPath.resolve(fileName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later item, we'll wanna explore a directory structure to avoid having one flat dir with all files.
Maybe some multiple to help map
Description:
Fixes #16
Fixes #17
Fixes #18
Fixes #21
First draft of #27
Notes for reviewer:
Checklist